前端對於WebSocket
這項技術應該不陌生,以往會需要使用輪詢的方式更新資料,目前大多改採用WebSocket
連線來建立雙向溝通的需求。
我們使用套件 web_socket_channel 這個 WebSocket
Library 處理客端的服務請求,使用方式很簡單
IOWebSocketChannel.connect(Uri.parse('WebSocket_Server_URL'));
channel.stream.listen((message) {
print(message);
});
channel.sink.add('received!');
channel.sink.close();
讓我們建立一個即時通訊的聊天室吧
我們使用 nodejs 套件 ws
完成 server 端的設定,在建立連線時從 websocket
的 query string 取得 token
參數,可以透過這邊方式進行身份的驗證以確認是否可以建立連線。
import WebSocket, { WebSocketServer } from "ws";
import { UniqueID } from "nodejs-snowflake";
import { createServer } from "http";
import url from "url";
const uid = new UniqueID();
const server = createServer();
const wss = new WebSocketServer({
noServer: true,
});
class User {
constructor(name) {
this.name = name;
}
}
function authenticate(request, callback) {
const { query } = url.parse(request.url, true);
if (!query.token) {
// 身分驗證
callback(new Error("token is no defined"));
return;
}
callback(null, new User(query.token));
}
wss.on("connection", function connection(ws, request, user) {
ws.on("message", function message(msg) {
console.log(`Received message from user ${user.name}`);
var data = msg.toString();
var json
try {
json = JSON.parse(data);
} catch (e) {
console.error(e.toString());
json = {};
}
// 根據通訊協議的格式設定處理聊天室功能
if (json.eventName === "chat:send") {
const obj = { eventName: "chat:msg", mid: uid.getUniqueID(), by: user.name, msg: json.data, time: Date.now() };
wss.clients.forEach(function each(client) {
if (client.readyState === WebSocket.OPEN) {
client.send(JSON.stringify(obj));
}
});
}
});
});
server.on("upgrade", function upgrade(request, socket, head) {
authenticate(request, (err, client) => {
if (err || !client) {
socket.write("HTTP/1.1 401 Unauthorized\r\n\r\n");
socket.destroy();
return;
}
wss.handleUpgrade(request, socket, head, function done(ws) {
wss.emit("connection", ws, request, client);
});
});
});
server.listen(8000);
為了一些業務邏輯的處理,我們使用 Connection
包裝 IOWebSocketChannel
類別,讓它可以處理斷線時的重連機制。WebSocket
訊息是透過 Dart stream
接口,這邊可能需要先理解一下 stream
的概念 為何。
IOWebSocketChannelstream
跟以往學習RxJS
概念很相近,因此我這邊另外使用了 Rxdart
套件處理訊息的轉發,這樣在斷線重線時不會影響到已綁定的事件行為。
class Connection {
bool _connected = false;
Uri uri;
late IOWebSocketChannel _channel;
late StreamSubscription _subscription;
PublishSubject<dynamic> stream = PublishSubject();
BehaviorSubject<bool> connected = BehaviorSubject();
Connection({required this.uri}) {
_connect();
}
_connect() {
if (_connected) {
_subscription.cancel();
}
_channel = IOWebSocketChannel.connect(uri);
_connected = true;
_subscription =
_channel.stream.listen(_onMessage, onError: _onError, onDone: _onDone);
connected.add(_connected);
}
void reconnect({Uri? uri}) {
if (uri != null) {
this.uri = uri;
}
_connect();
}
trigger(dynamic message) {
_channel.sink.add(json.encode(message));
}
void _onMessage(dynamic message) {
stream.add(jsonDecode(message));
}
void _onError(e) {
stream.addError(e);
}
void _onDone() {
_connected = false;
connected.add(_connected);
}
}
我們使用聊天室的 ViewModel 處理接收到的訊息,這邊定義聊天室訊息的格式, 當接收到 eventName
為 chat:msg
,會將該訊息轉成 Message 資料類別,存放到 data
內,接著透過 Rxdart
的 PublishSubject
發送出去。
class ChatViewModel {
Connection connection =
Connection(uri: Uri.parse('ws://test.dev.rde:8000/?token=sm'));
PublishSubject<List<Message>> stream = PublishSubject();
final List<Message> data = [];
ChatViewModel() {
connection.stream
.where((data) => data["eventName"] == "chat:msg")
.map((data) => Message.fromJson(data))
.listen(_addData);
}
void _addData(Message msg) async {
// 存放訊息資料
data.add(msg);
// 發送訊息通知給 StreamBuilder
stream.add(data);
}
我們使用 StreamBuilder
接收發送過來的 List<Message>
資料,並使用 ListView.builder
動態創建訊息欄位。
今日學習 WebSocket
技術的應用方式,遇到比較不熟悉的應該是在 stream
的使用上,後續可能要花點時間練習語法要怎麼使用,至於想理解 RxJS
可以看看 黃升煌
大大 第12屆的鐵人賽